-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Handle multiple pods to prevent KubernetesJobOperator falls with parallelism option
#49899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
hi @potiuk ! |
|
hi @shahar1 ! |
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Show resolved
Hide resolved
65676cc to
435b41b
Compare
df96d8e to
a650584
Compare
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
|
@Crowiant / @VladaZakharova - could you please refer to @steinwaywhw 's comments? Thank you! |
2946155 to
21af9e6
Compare
|
Hello @steinwaywhw can you please review my answers to your comments? Thank you! |
|
@hussein-awala, @shahar1, @jedcunningham can someone with review and write access please review these changes? 🙏 |
shahar1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay - re-reviewed it, almost there IMO :)
I'll be happy for an additional reviews, preferrably from people who use/maintain the Kubernetes operators.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py
Outdated
Show resolved
Hide resolved
|
@Crowiant could you please address issues raised by shahar1 review when you get a chance? Thank-you :) |
21af9e6 to
e5391a9
Compare
shahar1
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one small comment,
I'd appreciate an additional review before merging.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Outdated
Show resolved
Hide resolved
…h parallelism option
e5391a9 to
9dc7042
Compare
|
@shahar1 |
Closes: #44994
The problem with failing KubernetesJobOperator when using the parallelism option starts to appear from version 8.4.1, according to #44994.
Reason: As KubernetesJobOperator inherits from KubernetesPodOperator, it also uses some of the parent methods.
One of these, get_or_create_pod, is set to find only one or no pod during execution. In case the method finds two pods, it raises the exception 'More than one pod found'. While this is appropriate for the KubernetesPodOperator logic, KubernetesJobOperator could use more than one pod during execution.
That's why in this PR a new method, get_pods, was added. It will be more suitable for the logic of this operator. Also, a new attribute, self.pods, has been introduced in the operator. This attribute is needed for handling the logic of the do_xcom_push and get_logs flags.
Change KubernetesJobTrigger to handle KubernetesJobOperator with parallelism and deferrable flag.
Change GKEStartJobOperator.execute_deferrable to handle multiple pods.
Change GKEJobTrigger to handle multiple pods.
Adjust system test example_kubernetes_engine_job.py in google provider to reflect changes.
Adjust and add unit tests in providers: cncf/kubernetes, google
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.